package ltd.nullpointer.tcp.server.netty;

import com.boot2.core.dto.Uplink;
import com.boot2.core.utils.BeanUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import ltd.nullpointer.tcp.core.SyncFuture;
import ltd.nullpointer.tcp.core.constant.TCPEnum;
import ltd.nullpointer.tcp.core.exception.MessageResolverException;
import ltd.nullpointer.tcp.core.exception.NotOverridedException;
import ltd.nullpointer.tcp.core.message.EncryDecryMessage;
import ltd.nullpointer.tcp.core.message.ErrorMessage;
import ltd.nullpointer.tcp.core.message.NPiotTCPMessage;
import ltd.nullpointer.tcp.core.message.SysErrorMessage;
import ltd.nullpointer.tcp.core.util.ChannelHandlerContextUtil;

/**
 * 
 * @ClassName: AbstractServerChannelHandler
 * @Description: 抽象类，屏蔽netty和通讯技术细节
 * @author zhangweilin
 * @date 2017年12月7日 下午2:17:08
 *
 */
public abstract class AbstractServerChannelHandler extends AbstractChannelDuplexHandler {

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("第【"+(sessionManager.sessionMap.size()+1)+"】个设备连接进入: " +ctx);
		super.channelActive(ctx);
		// npiotMessage.setPayload("加密");

		// NPiotTCPMessage response = this.sendSync(npiotTCPMessage);
		// System.out.println("收到回复 response:" + response);
	}

	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		System.out.println("channelInactive");
		shutdownChannel(ctx);
		ctx.close();
	}

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		if (msg instanceof SysErrorMessage) {
			ErrorMessage errorMessage = new ErrorMessage();
			BeanUtils.copyProperties(msg, errorMessage);
			ctx.writeAndFlush(errorMessage);

		} else
		// 密钥握手,不参与业务
		if (msg instanceof EncryDecryMessage) {
			EncryDecryMessage encryDecryMessage = (EncryDecryMessage) msg;
			onMessage(ctx, encryDecryMessage);
			return;
		} else if (msg instanceof NPiotTCPMessage) {
			NPiotTCPMessage npiotTCPMessage = (NPiotTCPMessage) msg;
			try {
				// 先按内置组件处理
				handNPiotTCPMessage(ctx, npiotTCPMessage);

				// 如果是同步消息，则不用给回调，直接返回，如果是默认的异步消息，则给回调
				// SyncFuture<NPiotTCPMessage> future =
				// ChannelHandlerContextUtil.syncFutureMap.get(npiotTCPMessage.getMessageId());
				if (null != npiotTCPMessage.getIsSync() && npiotTCPMessage.getIsSync()) {
					SyncFuture<NPiotTCPMessage> future = ChannelHandlerContextUtil.syncFutureMap.get(npiotTCPMessage.getMessageId());
					future.setResponse(npiotTCPMessage);
				} else {
					onMessage(ctx, npiotTCPMessage);
				}
			} catch (MessageResolverException e) {
				e.printStackTrace();
				ErrorMessage errorMessage = new ErrorMessage(e.getErrCode(), e.getMessage());
				ctx.writeAndFlush(errorMessage);
			}
			return;
		}
//		System.out.println("channelRead: " + msg);
	}

	private void handNPiotTCPMessage(ChannelHandlerContext ctx, NPiotTCPMessage npiotTCPMessage) {
		String msgCode0 = npiotTCPMessage.getMsgCode();
		if (null == msgCode0) {
			throw new MessageResolverException(TCPEnum.ErrorCode.err10011.getErrCode(), TCPEnum.ErrorCode.err10011.getName());
		}
		String deviceId = npiotTCPMessage.getDeviceSn();
		TCPEnum.MsgCode msgCode = TCPEnum.MsgCode.getMsgCode(msgCode0);
		checkMessage(ctx, npiotTCPMessage);
		// 如果能从枚举中找到，说明是平台内置协议，此时需要平台自己处理，否则平台不处理，转成json后直接转发给cds
		if (null != msgCode) {
			switch (msgCode) {
			case msgCode2Online:
				sessionManager.add(deviceId, ctx.channel());
				log.info("设备 " + deviceId + " 成功上线,上线信息: " + npiotTCPMessage);

				// 如果不为空，说明设置过转发规则
				onlined(ctx, npiotTCPMessage);
				break;

			default:
				break;
			}
			// 为空表示为用户自定义协议，平台不做逻辑处理
		} else {
			// TODO 自定义逻辑
			log.debug("用户自定义协议");
//			System.out.println("处理设备上发的自定义消息: " + npiotTCPMessage);
		}

		// TODO 对报文进行处理,
	}

	/**
	 * 上线成功
	 * 
	 * @param ctx
	 * @param npiotTCPMessage
	 */
	protected void onlined(ChannelHandlerContext ctx, NPiotTCPMessage npiotTCPMessage) {

	}

	/**
	 * 基本校验
	 * 
	 * @param ctx
	 * @param npiotTCPMessage
	 */
	private void checkMessage(ChannelHandlerContext ctx, NPiotTCPMessage npiotTCPMessage) {
//		// 核对是否已上线
//		checkOnline(ctx, npiotTCPMessage);

	}

//	private void checkOnline(ChannelHandlerContext ctx, NPiotTCPMessage npiotTCPMessage) {
//		String deviceId = npiotTCPMessage.getDeviceId();
//		Integer msgCode = npiotTCPMessage.getMsgCode();
//
//		// 校验上线的设备id与使用的公钥密钥对应的产品，是否是对应关系，否则禁止上线，即禁止使用别人的公钥上线自己的设备
//		Long productId = Long.parseLong(sessionManager.getProductId(ctx.channel()));
//		Device device = findByDeviceSn(deviceId);
//		System.out.println("productId: " + productId);
//		System.out.println("device.productId: " + device.getProduct());
//		if (productId != device.getProduct().getId()) {
//			throw new MessageResolverException(ErrorCode.err10017);
//		}
//		// 如果是大于上线的操作
//		if (!sessionManager.isOnline(deviceId) && msgCode > MsgCode.msgCode2Online.getMessageCode()) {
//			throw new MessageResolverException(ErrorCode.err10012);
//			// TODO 有可能在此处补上线
//		}
//
//	}

//	protected abstract Device findByDeviceSn(String deviceId);

	protected void onMessage(ChannelHandlerContext ctx2, NPiotTCPMessage npiotTCPMessage) {
//		System.out.println("ctx2 = " + ctx2);
//		System.out.println("npiotTCPMessage = " + npiotTCPMessage);
	}

	/**
	 * 密钥握手
	 * 
	 * @param ctx
	 * @param encryDecryMessage
	 * @throws Exception
	 */
	protected void onMessage(ChannelHandlerContext ctx, EncryDecryMessage encryDecryMessage) throws Exception {
		// 如果RSA,直接是公钥。如果是AES,直接是密钥对的第一个，充当公钥
		String publicKey = encryDecryMessage.getPublicKey();
//		Product product = productJpaDao.findByPublicKey(publicKey);
//		if (null == product) {
//			ErrorMessage errorMessage = new ErrorMessage();
//			errorMessage.setErrCode(10001);
//			errorMessage.setErrMessage("根据指定的公钥,找不到相应的产品");
//			ctx.writeAndFlush(errorMessage);
//			return;
//		}
		System.out.println("ctx.channel()0 : " + ctx.channel());
//		sessionManager.addByProductId(product.getId() + "", ctx.channel());
		sessionManager.addByProductId(  "ssss", ctx.channel());
		onConnected(ctx);

//		// 客户端指定使用RSA+AES加密
//		if (encryDecryMessage.getType().equals(KeyType.RSA1)) {
//			String aesKey = RandomUtils.randomString(32);
//			System.out.println("RSA+AES加密，服务端持有的aes密钥: " + aesKey);
//			sessionManager.addByAESKey(aesKey, ctx.channel());
//
//			byte[] aesKeyByte = RSAUtil.encryptByPublicKey(aesKey, publicKey);
//			EncryDecryMessage encryDecryMessage2 = new EncryDecryMessage(KeyType.RSA2);
//			encryDecryMessage2.setPayloadByte(aesKeyByte);
//			ctx.writeAndFlush(encryDecryMessage2);
//
//			// 服务端的密钥，通过channel来获取
//			// String aesKey2= sessionManager.getAesKey(ctx.channel());
//			System.out.println("当前连接数: " + sessionManager.getAesChannelSize());
//			onConnected(ctx);
//		}
//		// 客户端指定使用AES加密
//		else if (encryDecryMessage.getType().equals(KeyType.AES1)) {
//			System.out.println("product: " + product);
//			String aesKey2 = product.getPrivateKey(); // 原始的aes key
//			String aesKey20 = RandomUtils.randomString(32);
//
//			System.out.println("AES加密，服务端持有的aes密钥: " + aesKey20);
//
//			sessionManager.addByAESKey(aesKey20, ctx.channel());
//			byte[] aesKeyByte = AesUtil.aesEncode(aesKey2, aesKey20.getBytes("UTF-8"));
//			EncryDecryMessage encryDecryMessage2 = new EncryDecryMessage(KeyType.AES2);
//			encryDecryMessage2.setPayloadByte(aesKeyByte);
//			ctx.writeAndFlush(encryDecryMessage2);
//			System.out.println("当前连接数: " + sessionManager.getAesChannelSize());
//			onConnected(ctx);
//		}
	}

	/**
	 * 密钥握手成功后
	 * 
	 * @param ctx
	 */
	protected void onConnected(ChannelHandlerContext ctx) {
	}

	/**
	 * 上行报文处理
	 * 
	 * @param ctx
	 * @param uplink
	 */
	protected void onMessage(ChannelHandlerContext ctx, Uplink uplink) {
		throw new NotOverridedException("方法未实现，uplink: " + uplink);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		log.error("Command channel handler exception", cause);
		// ctx.close();
	}

	// @Override
	// public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws
	// Exception {
	// if (evt instanceof IdleStateEvent) {
	// final IdleStateEvent e = (IdleStateEvent) evt;
	// if (e.state() == IdleState.READER_IDLE) {
	// log.info("Reader idle, remove session.");
	// // heartbeatHandleService.shutdown(ctx.channel());
	// shutdownChannel(ctx);
	// ctx.close();
	// } else if (e.state() == IdleState.WRITER_IDLE) {
	// System.out.println("write idle");
	// } else if (e.state() == IdleState.ALL_IDLE) {
	// System.out.println("all idle");
	// }
	//
	// }
	// }

	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		Channel incoming = ctx.channel();
		// System.out.println("incoming:" + incoming.remoteAddress());
		// int count = JlApplication.connCount.incrementAndGet();
		// if (count % 500 == 0) {
		// logger.info("连接数count:" + count);
		// }
	}

	// 连接超时操作
	protected void shutdownChannel(ChannelHandlerContext ctx) {
		// kafkaProducer.send(KafkaTopicConstant.tcp_channel_shutdown, productId);
		// 移除Channel缓存
		sessionManager.remove(ctx.channel());
	}

}
